 1.FlinkSQL
   
   1).什么是 Table API 和 Flink SQL
   Flink 本身是批流统一的处理框架，所以 Table API 和 SQL，就是批流统一的上层处理 API。
   Table API 是一套内嵌在 Java 和 Scala 语言中的查询 API，它允许我们以非常直观的方式，
   组合来自一些关系运算符的查询（比如 select、filter 和 join）。而对于 Flink SQL，就是直接可
以在代码中写 SQL，来实现一些查询（Query）操作。Flink 的 SQL 支持，基于实现了 SQL 标准的 
Apache Calcite（Apache 开源 SQL 解析工具）。
   无论输入是批输入还是流式输入，在这两套 API 中，指定的查询都具有相同的语义，得到相同的结果
   2).入门代码：
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.11.1</version>
			<type>pom</type>
			<scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.11.1</version>
			<scope>provided</scope>
        </dependency>
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.11.1</version>
			<scope>provided</scope>
        </dependency>
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.11.1</version>
			<scope>provided</scope>
        </dependency>
		
   依赖说明：
   flink-table-api-java-bridge_2.1:桥接器，主要负责 table API 和 DataStream/DataSet API
的连接支持，按照语言分 java 和 scala。
   flink-table-planner-blink_2.12:计划器，是 table API 最主要的部分，提供了运行时环境和生
成程序执行计划的 planner；
   如果是生产环境，lib 目录下默认已 经有了 planner，就只需要有 bridge 就可以了
   flink-table：flinktable的基础依赖
   代码：
   1、 Flink执行环境env
   2、 用env，做出Table环境tEnv
   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
   基于 blink 版本的流处理环境(Blink-Streaming-Query)或者，基于 blink 版本的批处理环境（Blink-
Batch-Query)：
        EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
//                .inBatchMode()
                .withBuiltInCatalogName("default_catalog")
                .withBuiltInDatabaseName("default_database")
                .build()
   3、 获取流式数据源
	DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new
SourceFunction<Tuple2<String, Integer>>() {
     	@Override
     	public void run(SourceContext<Tuple2<String, Integer>> ctx) throws
Exception {
       		while (true) {
         		ctx.collect(new Tuple2<>("name", 10));
         		Thread.sleep(1000);
       		}
     	}
     	@Override
     	public void cancel() {
     	}
	});
   4、 将流式数据源做成Table
   (1).table方式：	
   Table table = tEnv.fromDataStream(data, $("name"), $("age"));
   (2).sql方式：
	tEnv.createTemporaryView("userss",data, $("name"), $("age"));
   	String s = "select name from userss";
   	Table table = tEnv.sqlQuery(s);
   5、 对Table中的数据做查询
   (1).table方式：
   Table name = table.select($("name"));
   (2).sql方式：
	tEnv.createTemporaryView("userss",data, $("name"), $("age"));
   			String s = "select name from userss";
   			Table table = tEnv.sqlQuery(s);
   6、 将Table转成数据流：
   DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(name, Row.class);
package com.lagou.table;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiDemo {
    public static void main(String[] args) throws Exception {
        //Flink执行环境env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //用env，做出Table环境tEnv
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //获取流式数据源
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("name", 10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        //将流式数据源做成Table
        Table table = tEnv.fromDataStream(data, $("name"), $("age"));
        //对Table中的数据做查询
        Table name = table.select($("name"));
        //将处理结果输出到控制台
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(name, Row.class);

        //SQL方式
/*        tEnv.createTemporaryView("userss",data, $("name"), $("age"));
        String s = "select name from users";
        Table table1 = tEnv.sqlQuery(s);
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table1, Row.class);*/
        result.print();
        env.execute();
    }
}

   3).外部链接
   (1).Connectors
   Name          Version          Maven dependency               SQL Client JAR
   Filesystem                      Built-in                        Built-in
   Elasticsearch    6             flink-connector-elasticsearch6   Download
   Elasticsearch    7             flink-connector-elasticsearch7   Download
   Apache Kafka   0.10            flink-connector-kafka-0.10       Download
   Apache Kafka   0.11            flink-connector-kafka-0.11       Download
   Apache Kafka 0.11(universal)   flink-connector-kafka            Download
   Apache HBase   1.4.3           flink-connector-hbase            Download
   JDBC                           flink-connector-jdbc             Download
   (2).Formats
   Name                 Maven dependency    SQL Client JAR
   Old CSV(for files)     Built-in             Built-in
   CSV(for Kafka)         flink-csv            Built-in
   JSON                   flink-json           Built-in
   Apache Avro            flink-avro           Download
   1. 数据查询语言DQL
   数据查询语言DQL基本结构是由SELECT子句，FROM子句，WHERE
   子句组成的查询块：
   SELECT <字段名表>
   FROM <表或视图名>
   WHERE <查询条件>
   2 .数据操纵语言DML
   数据操纵语言DML主要有三种形式：
   1).插入：INSERT
   2).更新：UPDATE
   3).删除：DELETE
   3. 数据定义语言DDL
   数据定义语言DDL用来创建数据库中的各种对象-----表、视图、
   索引、同义词、聚簇等如：
   CREATE TABLE/VIEW/INDEX/SYN/CLUSTER
   表 视图 索引 同义词 簇
   DDL操作是隐性提交的！不能rollback
   4. 数据控制语言DCL
   数据控制语言DCL用来授予或回收访问数据库的某种特权，并控制
   数据库操纵事务发生的时间及效果，对数据库实行监视等。如：
   连接外部系统在 Catalog 中注册表，直接调用 tableEnv.connect()就可以，里面参数要传
入一个 ConnectorDescriptor，也就是 connector 描述器。对于文件系统的 connector 而言，
flink 内部已经提供了，就叫做 FileSystem()。
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.11.1</version>
        </dependency> 
	tEnv.connect(new FileSystem().path("sensor.txt"))// 定义表数据来源，外部连接
       		.withFormat(new Csv()) // 定义从外部系统读取数据之后的格式化方法
       		.withSchema(new Schema()
           			.field("id", DataTypes.STRING())
           			.field("timestamp", DataTypes.BIGINT())
           			.field("temperature", DataTypes.DOUBLE())) // 定义表结构
       		.createTemporaryTable("inputTable"); // 创建临时表
   连接Kafka：
	ConnectTableDescriptor descriptor = tEnv.connect(
       				// declare the external system to connect to
       				new Kafka()
           				.version("universal")
           				.topic("animal")
           				.startFromEarliest()
           				.property("bootstrap.servers", "linux122:9092")
   	)
       // declare a format for this system
       				.withFormat(
//            new Json()
           				new Csv()
       				)
       // declare the schema of the table
       				.withSchema(
           				new Schema()
//                .field("rowtime", DataTypes.TIMESTAMP(3))
//                .rowtime(new Rowtime()
//                    .timestampsFromField("timestamp")
//                    .watermarksPeriodicBounded(60000)
//                )
//                .field("user", DataTypes.BIGINT())
               		.field("message", DataTypes.STRING())
       				);
   	// create a table with given name
   		descriptor.createTemporaryTable("MyUserTable");
   		Table table1 = tEnv.sqlQuery("select * from MyUserTable");
   		DataStream<Tuple2<Boolean, Row>> tuple2DataStream =
tEnv.toRetractStream(table1, Row.class);
   		tuple2DataStream.print();
   4).查询数据
   (1).Table API
   官网：https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html
   select/filter/as
	Table filtered = table.select($("name"),
	$("age")).filter($("age").mod(2).isEqual(0));
   			//将处理结果输出到控制台
   			DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(filtered,
Row.class);
    
	Table mingzi = table.select($("name").as("mingzi"));
   			DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(mingzi,
Row.class);
   (2).SQL
	tEnv.createTemporaryView("userss",data, $("name"), $("age"));
   			String s = "select name,age from userss where mod(age,2)=0";
   			Table table = tEnv.sqlQuery(s);
   			DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table,
Row.class);
   5).输出表
   (1).输出到文件：
   代码：
	tEnv.connect(new FileSystem().path("D:\\data\\out.txt"))
       				.withFormat(new Csv())
       				.withSchema(new Schema().field("name",
DataTypes.STRING()).field("age",DataTypes.INT()))
       				.createTemporaryTable("outputTable");
   			filtered.executeInsert("outputTable");
package tablesql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

public class ToFileSystem {
    public static void main(String[] args) {
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //tEnv
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
//                .inBatchMode()
                .withBuiltInCatalogName("default_catalog")
                .withBuiltInDatabaseName("default_database")
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        //读数据
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                int num = 0;
                while (true) {
                    num++;
                    ctx.collect(new Tuple2<>("name" + num, num));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        Table nameTable = tEnv.fromDataStream(data, $("name"), $("num"));

        //将数据保存到文件系统中
        tEnv.connect(new FileSystem().path("d:\\data\\output"))
                .withFormat(new Csv())
                .withSchema(
                        new Schema()
                                .field("name", DataTypes.STRING())
                        .field("num", DataTypes.INT())
                )
                .createTemporaryTable("tmpTable");

        nameTable.executeInsert("tmpTable");
    }
}

   hive支持的输出到orc
package com.lagou.bak;
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.TypeDescription;
import java.util.Properties;
public class StreamingWriteFileOrc {
	public static void main(String[] args) throws Exception{
   		StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   		env.enableCheckpointing(10000);
   		env.setParallelism(1);
   		DataStream<RowData> dataStream = env.addSource(
       			new MySource());
   		//写入orc格式的属性
   		final Properties writerProps = new Properties();
   		writerProps.setProperty("orc.compress", "LZ4");
   		//定义类型和字段名
   		LogicalType[] orcTypes = new LogicalType[]{
       			new IntType(), new DoubleType(), new VarCharType()};
   		String[] fields = new String[]{"a1", "b2", "c3"};
   		TypeDescription typeDescription =
OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
       			orcTypes,
       			fields));
   		//构造工厂类OrcBulkWriterFactory
   		final OrcBulkWriterFactory<RowData> factory = new
OrcBulkWriterFactory<RowData>(
				new RowDataVectorizer(typeDescription.toString(), orcTypes),
       			writerProps,
       			new Configuration());
   		StreamingFileSink orcSink = StreamingFileSink
       			.forBulkFormat(new Path("d:\\data\\out"), factory)//file:///tmp/aaaa
       			.build();
   		dataStream.addSink(orcSink);
   		env.execute();
 	}
 	
	public static class MySource implements SourceFunction<RowData> {
   	@Override
   		public void run(SourceContext<RowData> sourceContext) throws Exception{
     		while (true){
       			GenericRowData rowData = new GenericRowData(3);
       			rowData.setField(0, (int) (Math.random() * 100));
       			rowData.setField(1, Math.random() * 100);
       			rowData.setField(2,
org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() *
100)));
       			sourceContext.collect(rowData);
       			Thread.sleep(10);
     		}
   		}
   		@Override
   		public void cancel(){
   		}
 	}
}
   (2).输出到Kafka
   定义
package tablesql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

public class ToKafka {
    public static void main(String[] args) {
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //tEnv
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
//                .inBatchMode()
                .withBuiltInCatalogName("default_catalog")
                .withBuiltInDatabaseName("default_database")
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        //读数据
        DataStreamSource<String> data = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                int num = 0;
                while (true) {
                    num++;
                    ctx.collect("name" + num);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        Table nameTable = tEnv.fromDataStream(data, $("name"));

        tEnv.connect(
                new Kafka()
                        .version("universal")
                        .topic("animal")
                        .startFromEarliest()
                        .property("bootstrap.servers", "linux121:9092")
        )
                .withFormat(new Csv())
                .withSchema(
                        new Schema().field("name", DataTypes.STRING())
                )
                .createTemporaryTable("animalTable");

        nameTable.executeInsert("animalTable");

    }
}

   (3).输出到mysql (了解)
	CREATE TABLE MyUserTable (
...
) WITH (
 		'connector.type' = 'jdbc', -- required: specify this table type is jdbc
 		'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
		'connector.table' = 'jdbc_table_name',  -- required: jdbc table name
 -- optional: the class name of the JDBC driver to use to connect to this URL.
 -- If not set, it will automatically be derived from the URL.
 		'connector.driver' = 'com.mysql.jdbc.Driver',
 -- optional: jdbc user name and password
 		'connector.username' = 'name',
 		'connector.password' = 'password',
-- **followings are scan options, optional, used when reading from a table**
-- optional: SQL query / prepared statement.
 -- If set, this will take precedence over the 'connector.table' setting
 		'connector.read.query' = 'SELECT * FROM sometable',
 -- These options must all be specified if any of them is specified. In addition,
 -- partition.num must be specified. They describe how to partition the table when
 -- reading in parallel from multiple tasks. partition.column must be a numeric,
 -- date, or timestamp column from the table in question. Notice that lowerBound and
 -- upperBound are just used to decide the partition stride, not for filtering the
 -- rows in table. So all rows in the table will be partitioned and returned.
 		'connector.read.partition.column' = 'column_name', -- optional: the column
name used for partitioning the input.
 		'connector.read.partition.num' = '50', -- optional: the number of partitions.
 		'connector.read.partition.lower-bound' = '500', -- optional: the smallest
value of the first partition.
 		'connector.read.partition.upper-bound' = '1000', -- optional: the largest
value of the last partition.
 -- optional, Gives the reader a hint as to the number of rows that should be
fetched
 -- from the database when reading per round trip. If the value specified is
zero, then
 -- the hint is ignored. The default value is zero.
 		'connector.read.fetch-size' = '100',
 -- **followings are lookup options, optional, used in temporary join**
 -- optional, max number of rows of lookup cache, over this value, the oldest
rows will
 -- be eliminated. "cache.max-rows" and "cache.ttl" options must all be
specified if any
 -- of them is specified. Cache is not enabled as default.
 		'connector.lookup.cache.max-rows' = '5000',
 -- optional, the max time to live for each rows in lookup cache, over this
time, the oldest rows
 -- will be expired. "cache.max-rows" and "cache.ttl" options must all be
specified if any of
 -- them is specified. Cache is not enabled as default.
 		'connector.lookup.cache.ttl' = '10s',
 		'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup
database failed
 -- **followings are sink options, optional, used when writing into table**
 -- optional, flush max size (includes all append, upsert and delete records),
 -- over this number of records, will flush data. The default value is "5000".
 		'connector.write.flush.max-rows' = '5000', 
 -- optional, flush interval mills, over this time, asynchronous threads will
flush data.
 -- The default value is "0s", which means no asynchronous flush thread will be
scheduled.
 		'connector.write.flush.interval' = '2s',
 -- optional, max retry times if writing records to database failed
 		'connector.write.max-retries' = '3'
		)
		
   
	